{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "weighted-albany",
   "metadata": {},
   "source": [
    "# ISAT demo using Sentinel 2 metadata for the self-intersection task\n",
    "# I-SAT has independent spatial join and temporal filtering\n",
    "## This demo could be used to find Spatio-temporal intersection for Sentinel-2 and itself"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "varying-elevation",
   "metadata": {},
   "source": [
    "## Input date range for query, we have dataset collected in 2020 in this demo"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "polyphonic-concern",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "hairy-ethiopia",
   "metadata": {},
   "outputs": [],
   "source": [
    "# date in the format of \"yyy-MM-dd\", results include both start date and end date\n",
    "\n",
    "# The hour delay variable represents how many hours apart the two datasets are considered to be temporally intersected\n",
    "# E.g. if hour_delay = 6, for 2020-06-26 12:00:00, timestamps between 2020-06-26 06:00:00 and 2020-06-26 18:00:00 are considered to be temporally intersected\n",
    "start_date = '2020-10-01'\n",
    "end_date = '2020-12-31'\n",
    "# time_index_hour_length = 72\n",
    "hour_delay = 6 ### test hour for 4 days, \n",
    "duration_month = 3 # for export"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "identical-invention",
   "metadata": {},
   "source": [
    "## please input the potential area of interest, we have \"Beaufort_Sea\" and \"Wandel_Sea\" in in this demo, could leave blank if not using AOI"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "dependent-spider",
   "metadata": {},
   "outputs": [],
   "source": [
    "Paoi = 'Wandel_Sea'"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "embedded-establishment",
   "metadata": {},
   "source": [
    "## First, setup the Spark and Sedona environment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "attractive-surface",
   "metadata": {},
   "outputs": [],
   "source": [
    "import findspark\n",
    "#findspark.init() "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "accepted-albania",
   "metadata": {},
   "outputs": [],
   "source": [
    "SPARK_HOME='/opt/cloudera/parcels/CDH/lib/spark'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "healthy-things",
   "metadata": {},
   "outputs": [],
   "source": [
    "findspark.init(SPARK_HOME)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "aging-elite",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/home/zhangp/.conda/envs/py37_environment/lib/python3.7/site-packages/geopandas/_compat.py:58: UserWarning: The installed version of PyGEOS is too old (0.6 installed, 0.8 required), and thus GeoPandas will not use PyGEOS.\n",
      "  UserWarning,\n"
     ]
    }
   ],
   "source": [
    "import json\n",
    "import os\n",
    "import codecs\n",
    "import subprocess\n",
    "#from hdfs import InsecureClient\n",
    "import numpy as np\n",
    "#from pyspark import SparkContext\n",
    "from pyspark import SQLContext\n",
    "from pyspark.sql import Row\n",
    "from pyspark.sql import functions as F\n",
    "from pyspark.sql.types import *\n",
    "import rtree\n",
    "from pyspark.sql import Window\n",
    "#import igraph\n",
    "#from igraph import Graph\n",
    "import geofeather"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "polyphonic-pride",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "from pyspark import StorageLevel\n",
    "import geopandas as gpd\n",
    "import pandas as pd\n",
    "from pyspark.sql.types import StructType\n",
    "from pyspark.sql.types import StructField\n",
    "from pyspark.sql.types import StringType\n",
    "from pyspark.sql.types import LongType\n",
    "from shapely.geometry import Point\n",
    "from shapely.geometry import Polygon\n",
    "\n",
    "from sedona.register import SedonaRegistrator\n",
    "from sedona.core.SpatialRDD import SpatialRDD\n",
    "from sedona.core.SpatialRDD import PointRDD\n",
    "from sedona.core.SpatialRDD import PolygonRDD\n",
    "from sedona.core.SpatialRDD import LineStringRDD\n",
    "from sedona.core.enums import FileDataSplitter\n",
    "from sedona.utils.adapter import Adapter\n",
    "from sedona.core.spatialOperator import KNNQuery\n",
    "from sedona.core.spatialOperator import JoinQuery\n",
    "from sedona.core.spatialOperator import JoinQueryRaw\n",
    "from sedona.core.spatialOperator import RangeQuery\n",
    "from sedona.core.spatialOperator import RangeQueryRaw\n",
    "from sedona.core.formatMapper.shapefileParser import ShapefileReader\n",
    "from sedona.core.formatMapper import WkbReader\n",
    "from sedona.core.formatMapper import WktReader\n",
    "from sedona.core.formatMapper import GeoJsonReader\n",
    "from sedona.sql.types import GeometryType\n",
    "from sedona.core.enums import GridType\n",
    "from sedona.core.SpatialRDD import RectangleRDD\n",
    "from sedona.core.enums import IndexType\n",
    "from sedona.core.geom.envelope import Envelope\n",
    "from sedona.utils import SedonaKryoRegistrator, KryoSerializer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "municipal-ownership",
   "metadata": {},
   "outputs": [],
   "source": [
    "os.environ['PYSPARK_PYTHON'] = \"./environment/bin/python\"\n",
    "os.environ['YARN_CONF_DIR'] = \"/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "challenging-motor",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession \\\n",
    ".builder \\\n",
    ".appName(\"test_test_1\") \\\n",
    ".master('yarn') \\\n",
    ".config(\"spark.serializer\", KryoSerializer.getName) \\\n",
    ".config(\"spark.kryo.registrator\", SedonaKryoRegistrator.getName) \\\n",
    ".config('spark.jars','sedona-core-2.4_2.11-1.0.0-incubating.jar,sedona-sql-2.4_2.11-1.0.0-incubating.jar,sedona-python-adapter-2.4_2.11-1.0.0-incubating.jar,sedona-viz-2.4_2.11-1.0.0-incubating.jar,geotools-wrapper-geotools-24.0.jar') \\\n",
    ".config('spark.executor.memory', '20g') \\\n",
    ".config('spark.driver.memory', '10g') \\\n",
    ".config('spark.sql.shuffle.partitions', 6144) \\\n",
    ".config('spark.executor.instances', '24') \\\n",
    ".config('spark.executor.cores', '5') \\\n",
    ".config('spark.rpc.message.maxSize', '1024') \\\n",
    ".config('spark.yarn.dist.archives', 'environment.tar.gz#environment') \\\n",
    ".getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "disabled-scanning",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "SedonaRegistrator.registerAll(spark)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "numerical-spank",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Extract data within potential time slots\n",
    "import time\n",
    "import datetime\n",
    "\n",
    "start_timestamp = time.mktime(datetime.datetime.strptime(start_date, \"%Y-%m-%d\").timetuple())\n",
    "start_timestamp = start_timestamp - hour_delay * 3600 # for potential time delay\n",
    "\n",
    "end_timestamp = time.mktime(datetime.datetime.strptime(end_date, \"%Y-%m-%d\").timetuple())\n",
    "end_timestamp = end_timestamp + hour_delay * 3600 # for potential time delay"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "funded-intelligence",
   "metadata": {},
   "source": [
    "## Second, read datasets"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "available-textbook",
   "metadata": {},
   "source": [
    "### read Sentinel data, Spark have different types of input methods, here we read from Hadoop file system, as our Spark 2.4 is built based on Hadoop Yarn\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "violent-proposition",
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "sentinel_df = spark.read.option(\"header\",True).options(delimiter='|').csv(\"meta_s2_2020_wkt_cloudcover.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "emerging-lodge",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- _c0: string (nullable = true)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- timestamp: string (nullable = true)\n",
      " |-- cloudcover_assessment: string (nullable = true)\n",
      " |-- wkt_geo: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "billion-poison",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "french-indianapolis",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+---------------------+--------------------+\n",
      "|_c0|            s2_index|           timestamp|cloudcover_assessment|             wkt_geo|\n",
      "+---+--------------------+--------------------+---------------------+--------------------+\n",
      "|  0|20200930T235749_2...|2020-10-01T00:00:...|            93.659861|POLYGON ((154.925...|\n",
      "|  1|20200930T235749_2...|2020-10-01T00:00:...|            99.919739|POLYGON ((155.851...|\n",
      "|  2|20200930T235749_2...|2020-10-01T00:00:...|             98.79224|POLYGON ((154.841...|\n",
      "|  3|20200930T235749_2...|2020-10-01T00:00:...|            88.397203|POLYGON ((156.773...|\n",
      "|  4|20200930T235749_2...|2020-10-01T00:00:...|            93.500239|POLYGON ((155.752...|\n",
      "|  5|20200930T235749_2...|2020-10-01T00:00:...|            88.250346|POLYGON ((156.870...|\n",
      "|  6|20200930T235749_2...|2020-10-01T00:00:...|            81.360003|POLYGON ((156.237...|\n",
      "|  7|20201001T000249_2...|2020-10-01T00:09:...|            11.674878|POLYGON ((145.020...|\n",
      "|  8|20201001T000249_2...|2020-10-01T00:08:...|            32.067495|POLYGON ((145.605...|\n",
      "|  9|20201001T000249_2...|2020-10-01T00:08:...|            46.908053|POLYGON ((145.917...|\n",
      "| 10|20201001T000249_2...|2020-10-01T00:09:...|             5.032759|POLYGON ((146.714...|\n",
      "| 11|20201001T000249_2...|2020-10-01T00:09:...|            21.563988|POLYGON ((147.122...|\n",
      "| 12|20201001T000249_2...|2020-10-01T00:08:...|            46.979577|POLYGON ((145.783...|\n",
      "| 13|20201001T000249_2...|2020-10-01T00:08:...|            41.197359|POLYGON ((147.116...|\n",
      "| 14|20201001T000249_2...|2020-10-01T00:08:...|            11.354851|POLYGON ((147.115...|\n",
      "| 15|20201001T000249_2...|2020-10-01T00:08:...|            11.036312|POLYGON ((146.241...|\n",
      "| 16|20201001T000249_2...|2020-10-01T00:09:...|             14.95059|POLYGON ((146.999...|\n",
      "| 17|20201001T000249_2...|2020-10-01T00:09:...|            18.875763|POLYGON ((148.354...|\n",
      "| 18|20201001T000249_2...|2020-10-01T00:08:...|            24.837191|POLYGON ((146.999...|\n",
      "| 19|20201001T000249_2...|2020-10-01T00:08:...|            27.403476|POLYGON ((146.999...|\n",
      "+---+--------------------+--------------------+---------------------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.show(20)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "furnished-emission",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import DoubleType\n",
    "sentinel_df = sentinel_df.withColumn('date_date_type', F.to_timestamp(F.udf(lambda x: x[:10] + ' ' + x[11:19])(F.col('timestamp')), 'yyyy-MM-dd HH:mm:ss')).\\\n",
    "withColumn('cloudcover_assessment', F.col('cloudcover_assessment').cast(DoubleType()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "alike-appearance",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "aggregate-tract",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+---------------------+--------------------+-------------------+\n",
      "|_c0|            s2_index|           timestamp|cloudcover_assessment|             wkt_geo|     date_date_type|\n",
      "+---+--------------------+--------------------+---------------------+--------------------+-------------------+\n",
      "|  0|20200930T235749_2...|2020-10-01T00:00:...|            93.659861|POLYGON ((154.925...|2020-10-01 00:00:09|\n",
      "|  1|20200930T235749_2...|2020-10-01T00:00:...|            99.919739|POLYGON ((155.851...|2020-10-01 00:00:13|\n",
      "|  2|20200930T235749_2...|2020-10-01T00:00:...|             98.79224|POLYGON ((154.841...|2020-10-01 00:00:08|\n",
      "|  3|20200930T235749_2...|2020-10-01T00:00:...|            88.397203|POLYGON ((156.773...|2020-10-01 00:00:13|\n",
      "|  4|20200930T235749_2...|2020-10-01T00:00:...|            93.500239|POLYGON ((155.752...|2020-10-01 00:00:05|\n",
      "|  5|20200930T235749_2...|2020-10-01T00:00:...|            88.250346|POLYGON ((156.870...|2020-10-01 00:00:12|\n",
      "|  6|20200930T235749_2...|2020-10-01T00:00:...|            81.360003|POLYGON ((156.237...|2020-10-01 00:00:04|\n",
      "|  7|20201001T000249_2...|2020-10-01T00:09:...|            11.674878|POLYGON ((145.020...|2020-10-01 00:09:07|\n",
      "|  8|20201001T000249_2...|2020-10-01T00:08:...|            32.067495|POLYGON ((145.605...|2020-10-01 00:08:56|\n",
      "|  9|20201001T000249_2...|2020-10-01T00:08:...|            46.908053|POLYGON ((145.917...|2020-10-01 00:08:43|\n",
      "+---+--------------------+--------------------+---------------------+--------------------+-------------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "centered-roberts",
   "metadata": {},
   "outputs": [],
   "source": [
    "# have a function to extract min latitude, could be done by ST_YMin, but not support by Sedona of this version\n",
    "import re\n",
    "def extract_y_min(wkt):\n",
    "    return min([float(re.search('\\d+\\.\\d+\\s(\\-*\\d+\\.\\d+)', i).group(1)) for i in re.findall('\\d+\\.\\d+\\s\\-*\\d+\\.\\d+', wkt)])\n",
    "extract_y_min_udf = F.udf(extract_y_min)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "swedish-neighborhood",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- _c0: string (nullable = true)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- timestamp: string (nullable = true)\n",
      " |-- cloudcover_assessment: double (nullable = true)\n",
      " |-- wkt_geo: string (nullable = true)\n",
      " |-- date_date_type: timestamp (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "different-upgrade",
   "metadata": {},
   "outputs": [],
   "source": [
    "# have a function to extract min latitude, could be done by ST_YMin, but not support by Sedona of this version\n",
    "import re\n",
    "def extract_y_max(wkt):\n",
    "    return max([float(re.search('\\d+\\.\\d+\\s(\\-*\\d+\\.\\d+)', i).group(1)) for i in re.findall('\\d+\\.\\d+\\s\\-*\\d+\\.\\d+', wkt)])\n",
    "extract_y_max_udf = F.udf(extract_y_max)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "viral-tutorial",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1785709"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sentinel_df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "prepared-subscription",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_df = sentinel_df.withColumn('min_lat', extract_y_min_udf(F.col('wkt_geo'))).withColumn('max_lat', extract_y_max_udf(F.col('wkt_geo')))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "id": "separated-radical",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+--------------------+--------------------+---------------------+--------------------+-------------------+-------------------+-------------------+\n",
      "|_c0|            s2_index|           timestamp|cloudcover_assessment|             wkt_geo|     date_date_type|            min_lat|            max_lat|\n",
      "+---+--------------------+--------------------+---------------------+--------------------+-------------------+-------------------+-------------------+\n",
      "|  0|20200930T235749_2...|2020-10-01T00:00:...|            93.659861|POLYGON ((154.925...|2020-10-01 00:00:09|-12.485191514367218|-11.752917740644865|\n",
      "|  1|20200930T235749_2...|2020-10-01T00:00:...|            99.919739|POLYGON ((155.851...|2020-10-01 00:00:13|-12.682261795097798| -12.64864668521512|\n",
      "|  2|20200930T235749_2...|2020-10-01T00:00:...|             98.79224|POLYGON ((154.841...|2020-10-01 00:00:08|-12.682785600627383| -11.74526276670583|\n",
      "|  3|20200930T235749_2...|2020-10-01T00:00:...|            88.397203|POLYGON ((156.773...|2020-10-01 00:00:13|-12.911348915578905|-12.637212565514444|\n",
      "|  4|20200930T235749_2...|2020-10-01T00:00:...|            93.500239|POLYGON ((155.752...|2020-10-01 00:00:05| -12.73457117365686|-11.734668563942291|\n",
      "|  5|20200930T235749_2...|2020-10-01T00:00:...|            88.250346|POLYGON ((156.870...|2020-10-01 00:00:12| -12.93735723426378| -12.64958923896515|\n",
      "|  6|20200930T235749_2...|2020-10-01T00:00:...|            81.360003|POLYGON ((156.237...|2020-10-01 00:00:04|-12.744126646132784| -11.74613615455164|\n",
      "|  7|20201001T000249_2...|2020-10-01T00:09:...|            11.674878|POLYGON ((145.020...|2020-10-01 00:09:07| -44.09399396634854| -43.33953136364434|\n",
      "|  8|20201001T000249_2...|2020-10-01T00:08:...|            32.067495|POLYGON ((145.605...|2020-10-01 00:08:56|-43.435394978882336| -42.44357827033972|\n",
      "|  9|20201001T000249_2...|2020-10-01T00:08:...|            46.908053|POLYGON ((145.917...|2020-10-01 00:08:43|-42.534871781265494| -41.62609846326433|\n",
      "| 10|20201001T000249_2...|2020-10-01T00:09:...|             5.032759|POLYGON ((146.714...|2020-10-01 00:09:13| -44.32474894686195| -44.25242976594088|\n",
      "| 11|20201001T000249_2...|2020-10-01T00:09:...|            21.563988|POLYGON ((147.122...|2020-10-01 00:09:05| -44.32528752744198| -43.34610468640119|\n",
      "| 12|20201001T000249_2...|2020-10-01T00:08:...|            46.979577|POLYGON ((145.783...|2020-10-01 00:08:52| -43.44076564154274| -42.44560548535187|\n",
      "| 13|20201001T000249_2...|2020-10-01T00:08:...|            41.197359|POLYGON ((147.116...|2020-10-01 00:08:38| -42.54007687182902| -41.54634263114919|\n",
      "| 14|20201001T000249_2...|2020-10-01T00:08:...|            11.354851|POLYGON ((147.115...|2020-10-01 00:08:23| -41.63978645282975| -40.64847798917236|\n",
      "| 15|20201001T000249_2...|2020-10-01T00:08:...|            11.036312|POLYGON ((146.241...|2020-10-01 00:08:08|-40.738813944114995|-39.748937682350146|\n",
      "| 16|20201001T000249_2...|2020-10-01T00:09:...|             14.95059|POLYGON ((146.999...|2020-10-01 00:09:11|-44.582750840337404| -44.24451507280735|\n",
      "| 17|20201001T000249_2...|2020-10-01T00:09:...|            18.875763|POLYGON ((148.354...|2020-10-01 00:09:03|-44.341237163495556|-43.344747653656526|\n",
      "| 18|20201001T000249_2...|2020-10-01T00:08:...|            24.837191|POLYGON ((146.999...|2020-10-01 00:08:48| -43.44082893892228| -42.44429023997796|\n",
      "| 19|20201001T000249_2...|2020-10-01T00:08:...|            27.403476|POLYGON ((146.999...|2020-10-01 00:08:34| -42.54013827905396|-41.543683853244104|\n",
      "+---+--------------------+--------------------+---------------------+--------------------+-------------------+-------------------+-------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "id": "dressed-frontier",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1785709"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sentinel_df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "id": "contained-shame",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_df = sentinel_df.filter(F.col('cloudcover_assessment') < F.lit(50)).filter((F.col('min_lat') > F.lit(60)) | (F.col('max_lat') < F.lit(-60)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "id": "wound-woman",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "166943"
      ]
     },
     "execution_count": 26,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sentinel_df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "id": "invalid-better",
   "metadata": {},
   "outputs": [],
   "source": [
    "# extract spatial information using WKT format footprint\n",
    "sentinel_df.createOrReplaceTempView(\"sentinel_df\")\n",
    "sentinel_sedona = spark.sql(\"select ST_GeomFromWKT(sentinel_df.wkt_geo) as geometry, sentinel_df.s2_index as s2_index, sentinel_df.date_date_type as date_date_type from sentinel_df\")\n",
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "sentinel_sedona = sentinel_sedona.withColumnRenamed('date_date_type', 'Sentinel_date_date_type')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "id": "scheduled-yellow",
   "metadata": {},
   "outputs": [],
   "source": [
    "# sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "# sentinel_sedona = spark.sql('select ST_Envelope(sentinel_sedona.geometry) as envelope, * from sentinel_sedona')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "id": "north-transparency",
   "metadata": {},
   "outputs": [],
   "source": [
    "# sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "# sentinel_sedona = spark.sql('select ST_Y(sentinel_sedona.geometry) as test_Y, * from sentinel_sedona')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "id": "canadian-reflection",
   "metadata": {},
   "outputs": [],
   "source": [
    "# test_sentinel_pd = sentinel_sedona.limit(5).toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "id": "polish-revolution",
   "metadata": {},
   "outputs": [],
   "source": [
    "# test_sentinel_pd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "id": "premier-battlefield",
   "metadata": {},
   "outputs": [],
   "source": [
    "# str(test_sentinel_pd.loc[0, 'envelope'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "id": "digital-secondary",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|\n",
      "+--------------------+--------------------+-----------------------+\n",
      "|POLYGON ((177.977...|20201001T002611_2...|    2020-10-01 00:27:04|\n",
      "|POLYGON ((-179.62...|20201001T002611_2...|    2020-10-01 00:26:30|\n",
      "|POLYGON ((169.906...|20201001T002611_2...|    2020-10-01 00:27:49|\n",
      "|POLYGON ((170.004...|20201001T002611_2...|    2020-10-01 00:27:35|\n",
      "|POLYGON ((170.421...|20201001T002611_2...|    2020-10-01 00:27:24|\n",
      "|POLYGON ((169.867...|20201001T002611_2...|    2020-10-01 00:27:56|\n",
      "|POLYGON ((168.665...|20201001T002611_2...|    2020-10-01 00:27:54|\n",
      "|POLYGON ((171.228...|20201001T002611_2...|    2020-10-01 00:27:46|\n",
      "|POLYGON ((168.977...|20201001T002611_2...|    2020-10-01 00:27:31|\n",
      "|POLYGON ((169.983...|20201001T002611_2...|    2020-10-01 00:27:18|\n",
      "+--------------------+--------------------+-----------------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Extract data within potential time slots\n",
    "\n",
    "sentinel_sedona = sentinel_sedona.filter(F.unix_timestamp(\"Sentinel_date_date_type\") <= end_timestamp).filter(F.unix_timestamp(\"Sentinel_date_date_type\") >= start_timestamp)\n",
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "\n",
    "sentinel_sedona.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "intense-review",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "reserved-saturn",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "id": "comparable-occasion",
   "metadata": {},
   "source": [
    "## Third, spatio-temporal join Sentinel dataset and Sentinel dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "id": "marine-leadership",
   "metadata": {},
   "outputs": [],
   "source": [
    "# 3.1 join the two Sedona dataframe according the timestamps, hour level at now, consider time delay"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "id": "hungarian-death",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona = sentinel_sedona.withColumn('sentinel_max_timestamp', (F.unix_timestamp(\"Sentinel_date_date_type\") + hour_delay * 3600)).\\\n",
    "withColumn('sentinel_min_timestamp', (F.unix_timestamp(\"Sentinel_date_date_type\") - hour_delay * 3600))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "id": "noted-geography",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+--------------------+-----------------------+----------------------+----------------------+\n",
      "|            geometry|            s2_index|Sentinel_date_date_type|sentinel_max_timestamp|sentinel_min_timestamp|\n",
      "+--------------------+--------------------+-----------------------+----------------------+----------------------+\n",
      "|POLYGON ((177.977...|20201001T002611_2...|    2020-10-01 00:27:04|            1601548024|            1601504824|\n",
      "|POLYGON ((-179.62...|20201001T002611_2...|    2020-10-01 00:26:30|            1601547990|            1601504790|\n",
      "|POLYGON ((169.906...|20201001T002611_2...|    2020-10-01 00:27:49|            1601548069|            1601504869|\n",
      "|POLYGON ((170.004...|20201001T002611_2...|    2020-10-01 00:27:35|            1601548055|            1601504855|\n",
      "|POLYGON ((170.421...|20201001T002611_2...|    2020-10-01 00:27:24|            1601548044|            1601504844|\n",
      "|POLYGON ((169.867...|20201001T002611_2...|    2020-10-01 00:27:56|            1601548076|            1601504876|\n",
      "|POLYGON ((168.665...|20201001T002611_2...|    2020-10-01 00:27:54|            1601548074|            1601504874|\n",
      "|POLYGON ((171.228...|20201001T002611_2...|    2020-10-01 00:27:46|            1601548066|            1601504866|\n",
      "|POLYGON ((168.977...|20201001T002611_2...|    2020-10-01 00:27:31|            1601548051|            1601504851|\n",
      "|POLYGON ((169.983...|20201001T002611_2...|    2020-10-01 00:27:18|            1601548038|            1601504838|\n",
      "|POLYGON ((170.999...|20201001T002611_2...|    2020-10-01 00:27:40|            1601548060|            1601504860|\n",
      "|POLYGON ((170.999...|20201001T002611_2...|    2020-10-01 00:27:26|            1601548046|            1601504846|\n",
      "|POLYGON ((170.999...|20201001T002611_2...|    2020-10-01 00:27:12|            1601548032|            1601504832|\n",
      "|POLYGON ((171.076...|20201001T002611_2...|    2020-10-01 00:26:58|            1601548018|            1601504818|\n",
      "|POLYGON ((173.546...|20201001T002611_2...|    2020-10-01 00:27:20|            1601548040|            1601504840|\n",
      "|POLYGON ((173.656...|20201001T002611_2...|    2020-10-01 00:27:06|            1601548026|            1601504826|\n",
      "|POLYGON ((173.778...|20201001T002611_2...|    2020-10-01 00:26:52|            1601548012|            1601504812|\n",
      "|POLYGON ((177.258...|20201001T002611_2...|    2020-10-01 00:27:03|            1601548023|            1601504823|\n",
      "|POLYGON ((178.511...|20201001T002611_2...|    2020-10-01 00:27:08|            1601548028|            1601504828|\n",
      "|POLYGON ((176.999...|20201001T002611_2...|    2020-10-01 00:26:30|            1601547990|            1601504790|\n",
      "+--------------------+--------------------+-----------------------+----------------------+----------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "sentinel_sedona.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3b8c7f4e-7d3f-42c6-9d8a-68e408828be7",
   "metadata": {},
   "source": [
    "## copy Sentinel data for self-intersection task"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "id": "golden-roulette",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona_to_join = sentinel_sedona.withColumn('sentinel_copy_tojoin', F.unix_timestamp(\"Sentinel_date_date_type\"))\\\n",
    ".select('geometry', 's2_index', 'sentinel_copy_tojoin').withColumnRenamed('geometry', 'geometry_copy_tojoin').withColumnRenamed('s2_index', 's2_index_copy_tojoin')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "id": "binary-assumption",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "sentinel_sedona_to_join.createOrReplaceTempView(\"sentinel_sedona_to_join\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "sophisticated-abraham",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "id": "sealed-matthew",
   "metadata": {},
   "outputs": [],
   "source": [
    "# sentinel_sedona_tosave = spark.sql('SELECT ST_AsText(geometry) as geo_wkt, * from sentinel_sedona')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "id": "signal-firmware",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- geo_wkt: string (nullable = false)\n",
      " |-- geometry: geometry (nullable = false)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- Sentinel_date_date_type: timestamp (nullable = true)\n",
      " |-- sentinel_max_timestamp: long (nullable = true)\n",
      " |-- sentinel_min_timestamp: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# sentinel_sedona_tosave.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "id": "destroyed-payroll",
   "metadata": {},
   "outputs": [],
   "source": [
    "# sentinel_sedona_tosave.select('geo_wkt', 's2_index', 'Sentinel_date_date_type', 's2_index', 'time_index_join', 'sentinel_max_timestamp', 'sentinel_min_timestamp').write.format('parquet')\\\n",
    "#         .mode(\"overwrite\")\\\n",
    "#         .option('header', True)\\\n",
    "#         .save('sentinel_sedona_dateforjoin_maxmin_wkt')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "anonymous-cliff",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "atmospheric-obligation",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "associate-national",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "declared-omega",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "prescription-audit",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "owned-bearing",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ready-manitoba",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "id": "engaging-syndicate",
   "metadata": {},
   "outputs": [],
   "source": [
    "sentinel_sedona.createOrReplaceTempView(\"sentinel_sedona\")\n",
    "sentinel_sedona_to_join.createOrReplaceTempView(\"sentinel_sedona_to_join\")\n",
    "\n",
    "test_spatial_join = spark.sql(\"SELECT * FROM sentinel_sedona, sentinel_sedona_to_join WHERE ST_Intersects(sentinel_sedona.geometry, sentinel_sedona_to_join.geometry_copy_tojoin)\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "id": "aging-supervisor",
   "metadata": {},
   "outputs": [],
   "source": [
    "# import time\n",
    "# start_spatial_time = time.time()\n",
    "# test_spatial_join.select('uuid', 'Description', 'is_timestamp').write.format('com.databricks.spark.csv')\\\n",
    "#         .mode(\"overwrite\")\\\n",
    "#         .option('header', True)\\\n",
    "#         .save('sentinel_is2_intersect_pure_spatial')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "id": "reflected-extension",
   "metadata": {},
   "outputs": [],
   "source": [
    "# with open('speed_pure_spatial_join_month.txt', 'w') as f:\n",
    "#     f.write('pure spatial join time:')\n",
    "#     f.write(str(time.time() - start_spatial_time))\n",
    "#     f.write('/n')\n",
    "#     f.write('number of spatial intersections:')\n",
    "#     f.write(str(test_spatial_join.count()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "id": "buried-colleague",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- geometry: geometry (nullable = false)\n",
      " |-- s2_index: string (nullable = true)\n",
      " |-- Sentinel_date_date_type: timestamp (nullable = true)\n",
      " |-- sentinel_max_timestamp: long (nullable = true)\n",
      " |-- sentinel_min_timestamp: long (nullable = true)\n",
      " |-- geometry_copy_tojoin: geometry (nullable = false)\n",
      " |-- s2_index_copy_tojoin: string (nullable = true)\n",
      " |-- sentinel_copy_tojoin: long (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "test_spatial_join.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "id": "relevant-diversity",
   "metadata": {},
   "outputs": [],
   "source": [
    "# test_spatial_join = test_spatial_join.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "id": "scheduled-ancient",
   "metadata": {},
   "outputs": [],
   "source": [
    "test_ST_join = test_spatial_join.filter(F.col('sentinel_max_timestamp') >= F.col('sentinel_copy_tojoin')).filter(F.col('sentinel_min_timestamp') <= F.col('sentinel_copy_tojoin'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "id": "characteristic-transcription",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "start_temporal_time = time.time()\n",
    "test_ST_join.select('s2_index', 's2_index_copy_tojoin', 'sentinel_copy_tojoin').write.format('com.databricks.spark.csv')\\\n",
    "        .mode(\"overwrite\")\\\n",
    "        .option('header', True)\\\n",
    "        .save('sentinel_self_intersect_spatial')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "id": "better-tiffany",
   "metadata": {},
   "outputs": [],
   "source": [
    "with open('ISAT_{}_delay_S2Self_inter.txt'.format(str(hour_delay)), 'w') as f:\n",
    "    f.write('spatial first join time in total:')\n",
    "    f.write(str(time.time() - start_temporal_time))\n",
    "    f.write('\\n')\n",
    "    f.write('number of spatial intersections:')\n",
    "    f.write(str(test_ST_join.count()))\n",
    "    f.write('\\n')\n",
    "    f.write('from and to date')\n",
    "    f.write(start_date)\n",
    "    f.write('\\n')\n",
    "    f.write(end_date)\n",
    "    f.write('\\n')\n",
    "\n",
    "    f.write('Sentinel 2 data count:')\n",
    "    f.write(str(sentinel_sedona.count()))\n",
    "    \n",
    "    \n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "pharmaceutical-thickness",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "straight-detector",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "forward-gazette",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
